-
Notifications
You must be signed in to change notification settings - Fork 641
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
add SNS connector with publish sink #204 #205
Conversation
fgrutsch
commented
Feb 25, 2017
- Includes a new connector called sns
- Includes a sink to publish messages to an AWS SNS topic
REFS #204 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some comments added. Looks promising!
build.sbt
Outdated
@@ -77,6 +77,14 @@ lazy val simpleCodecs = project | |||
name := "akka-stream-alpakka-simple-codecs" | |||
) | |||
|
|||
lazy val sns = project | |||
.in(file("sns")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As the folder is the same name as the project/module, you don't have to specify .in(file(...))
it should be magically correct just by calling project
(see module above for example)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Alright, I will try it out.
docs/src/main/paradox/sns.md
Outdated
@@ -0,0 +1,77 @@ | |||
# AWS SNS Connector | |||
|
|||
The AWS SNS connector provides an Akka Stream sink for AWS SNS. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would be good with just a tad more about what SNS is, "provides an Akka Stream sink for push notifications through AWS SNS", so that a reader who doesn't know doesn't have to go to the official documentation directly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point, I will add some more information.
|
||
import scala.concurrent.{Future, Promise} | ||
|
||
final class SnsPublishSinkStage(topicArn: String, snsClient: AmazonSNSAsync) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the AmazonSNSAsync
thread safe? A stage can be materialized any number of times and all of them will share the same snsClient
. If it cannot be shared a factory lambda is better (() => AmazonSNSAsync
), that way every materialization can have its own client (which it will be responsible to manage/close etc in sync with the stage lifecycle).
Another option might be to take a AmazonSNSAsyncClientBuilder
and .build()
that each materialization (if the builder is immutable that is).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
An additional thought: It could be better to make this a flow, emitting the values downstream as they have been ack:ed from AWS, the factory for the simple use case as sink could then be achieved through composition (SNSPublishFlow.to(Sink.ignore)
).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes the AmazonSNSAsync is ThreadSafe regarding to the AWS docs. That's why I think it is fine to share a single AmazonSNSAsync among multiple materialization stages.
Also regarding your last comment about not making the AmazonSNSAsync implicit, I try to be consistent with other connectors, like the SQS one. There we also pass the AmazonSQSClient implicitly. I think it will be easier for people who use the connectors if the initialization of the GraphStages work the same way.
Good point to change the Sink to a Flow. I thought about keeping the Sink and provide an additional Flow GraphStage which will return the messageId of the published message, so clients can do further processing with the returned messageId. What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, Hadn't noticed that how many connectors that do that. Thats a valid argument to keep it implicit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for verifying that the client is threadsafe.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe even just let it emit PublishResult downstream and then both String => String, and String => MessageId can be achieved through combinations with other stages.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated the SnsPublishFlow to emit a PublishResult.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This means we could delete the sink stage and express it by using the flow in the factory method instead:
val flow: Flow[String, PublishResult, NotUsed]
val sink: Sink[String, Future[Done]] = flow.toMat(Sink.ignore)(Keep.right)
} | ||
|
||
def handleFailure(ex: Throwable): Unit = { | ||
log.error(ex, "AmazonSNSAsync failure: {}", ex.getMessage) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need to log the failure since we fail the stage and the materialized value with it. You could wrap it with a en exception to decorate with extra information from the stage though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Alright I will remove it then.
publishRequest, | ||
new PublishResult().withMessageId("message-id") | ||
) | ||
new CompletableFuture() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This future is never completed though, shouldn't that cause the matVal
to never complete, is there a bug around waiting for ack on publishing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will further investigate it.
@johanandren I updated the PR regarding your comments. Furthemrore, I also added a SnsPublishFlow what I mentioned in one of my replies of your comments. |
val request = new PublishRequest().withTopicArn(topicArn).withMessage(grab(in)) | ||
|
||
snsClient.publishAsync(request, | ||
new AsyncHandler[PublishRequest, PublishResult] { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not necessarily anything to act on but, as this gets the values passed in on success/failure, it could be a single instance of AsyncHandler
that you reuse rather than a new instance for every message.
} | ||
|
||
override def onPull(): Unit = { | ||
if (isClosed(in)) completeStage() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There could be an ongoing publish that did not yet complete here, even if in was closed, you need to keep track of that and emit that last publish result when it comes, and then complete the stage.
|
||
import scala.compat.java8.FutureConverters._ | ||
|
||
object SnsPublishSink { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe this should then instead be SnsPublisher
and have both .sink
and .flow
in the same place?
@johanandren Updated regarding your comments. Removed the SnsPublishSink and provide a SnsPublisher with .flow and .sink factory methods. Also implemented a way to keep track of in flight messages for proper stage closing. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks @fg-devs !
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looking good, but some questions around the push/pull logic
log.debug("Published SNS message: {}", result.getMessageId) | ||
inFlight -= 1 | ||
if (isAvailable(out)) { | ||
if (!hasBeenPulled(in)) tryPull(in) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why pull
here? shouldn't that only be done from onPull
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wouldn't this limit the Flow to process elements only one by one? Otherwise we would always have to wait for the output port to pull?
Thats why I also made inFlight as an Int and not a Boolean.
private def handleSuccess(result: PublishResult): Unit = { | ||
log.debug("Published SNS message: {}", result.getMessageId) | ||
inFlight -= 1 | ||
if (isAvailable(out)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
when will it not be available
? should this be !isClosed
we are just dropping the message if it's not available
so I want to be sure that we don't do that accidentally
} | ||
|
||
override def onPush(): Unit = { | ||
inFlight += 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can inFlight
be > 1? perhaps use boolean
if it can only be 0 or 1
|
||
override def onPull(): Unit = { | ||
if (isClosed(in) && inFlight == 0) completeStage() | ||
if (!hasBeenPulled(in)) tryPull(in) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
else if
import com.amazonaws.services.sns.AmazonSNSAsync | ||
import com.amazonaws.services.sns.model.{PublishRequest, PublishResult} | ||
|
||
final class SnsPublishFlowStage(topicArn: String, snsClient: AmazonSNSAsync) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Internal API?
private[akka]
object SnsPublisher { | ||
|
||
/** | ||
* Java API: creates a [[Flow]] from a [[SnsPublishFlowStage]] for a SNS topic using an [[AmazonSNSAsync]] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
users should not care about the implementation SnsPublishFlowStage
Hi there @fg-devs, are you going to have some time to address the small comments made? |
@ktoso Hey, yes I will do the changes today. |
# Conflicts: # build.sbt # docs/src/main/paradox/connectors.md
# Conflicts: # build.sbt
import com.amazonaws.services.sns.AmazonSNSAsync | ||
import com.amazonaws.services.sns.model.{PublishRequest, PublishResult} | ||
|
||
private[akka] final class SnsPublishFlowStage(topicArn: String, snsClient: AmazonSNSAsync) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does the client have a lifecycle, is is thread safe, should this be a client factory instead (() => AmazonSNSAsync
) and have its lifecycle managed by the stage (`client.close() when the stage stops)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NVM, I already asked that previously but github hid all comments and I had forgot.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!